# Copyright © 2024 Pathway

from __future__ import annotations

from typing import Iterable, Literal

from pathway.internals import api, datasink, datasource
from pathway.internals.expression import ColumnReference
from pathway.internals.runtime_type_check import check_arg_types
from pathway.internals.schema import Schema
from pathway.internals.table import Table
from pathway.internals.table_io import table_from_datasource
from pathway.internals.trace import trace_user_frame
from pathway.io._utils import (
    MessageQueueOutputFormat,
    _get_unique_name,
    check_raw_and_plaintext_only_kwargs_for_message_queues,
    construct_schema_and_data_format,
)


@check_arg_types
@trace_user_frame
def read(
    uri: str,
    topic: str,
    *,
    schema: type[Schema] | None = None,
    format: Literal["plaintext", "raw", "json"] = "raw",
    autocommit_duration_ms: int | None = 1500,
    json_field_paths: dict[str, str] | None = None,
    jetstream_stream_name: str | None = None,
    durable_consumer_name: str | None = None,
    parallel_readers: int | None = None,
    name: str | None = None,
    max_backlog_size: int | None = None,
    debug_data=None,
    **kwargs,
) -> Table:
    """Reads data from a specified NATS topic.

    It supports three formats: ``"plaintext"``, ``"raw"``, and ``"json"``.

    For the ``"raw"`` format, the payload is read as raw bytes and added directly to the
    table. In the ``"plaintext"`` format, the payload decoded from UTF-8 and stored as plain text.
    In both cases, the table will have an autogenerated primary key and a single ``"data"``
    column representing the payload.

    If you select the ``"json"`` format, the connector parses the message payload as JSON
    and creates table columns based on the schema provided in the ``schema`` parameter. The
    column values come from the corresponding JSON fields.

    The JetStream extension is supported. To read a NATS topic from a JetStream stream,
    specify the ``jetstream_stream_name`` parameter. When this parameter is provided,
    Pathway will use the given stream to read data.

    You can also use your own durable JetStream pull consumer if you already have one.
    To do this, specify the ``durable_consumer_name`` parameter. Otherwise, Pathway will
    automatically create a durable consumer for you with an auto-generated name. This
    name will remain the same as long as you do not change your set of input sources.

    Args:
        uri: The URI of the NATS server.
        topic: The name of the NATS topic to read data from.
        schema: The table schema, used only when the format is set to ``"json"``.
        format: The input data format, which can be ``"raw"``, ``"plaintext"``, or
            ``"json"``.
        autocommit_duration_ms: The time interval (in milliseconds) between commits.
            After this time, the updates received by the connector are committed and
            added to Pathway's computation graph.
        json_field_paths: For the ``"json"`` format, this allows mapping field names to
            paths within the JSON structure. Use the format ``<field_name>: <path>``
            where the path follows the
            `JSON Pointer (RFC 6901) <https://www.rfc-editor.org/rfc/rfc6901>`_.
        jetstream_stream_name: if specified, the
            `JetStream <https://docs.nats.io/nats-concepts/jetstream>`_ extension is used.
            In this case, the specified stream will be used.
        durable_consumer_name: the name of the durable pull consumer to use with JetStream.
            If not specified, the consumer will be created automatically with default
            settings. The consumer's name will remain the same unless the set of input
            sources is changed.
        parallel_readers: The number of reader instances running in parallel. If not
            specified, it defaults to ``min(pathway_threads, total_partitions)``. It
            can't exceed the number of Pathway engine threads and will be reduced if
            necessary.
        name: A unique name for the connector. If provided, this name will be used in
            logs and monitoring dashboards. Additionally, if persistence is enabled, it
            will be used as the name for the snapshot that stores the connector's progress.
        max_backlog_size: Limit on the number of entries read from the input source and kept
            in processing at any moment. Reading pauses when the limit is reached and resumes
            as processing of some entries completes. Useful with large sources that
            emit an initial burst of data to avoid memory spikes.
        debug_data: Static data replacing original one when debug mode is active.

    Returns:
        Table: The table read.

    Example:

    To run local tests, you can download the ``nats-server`` binary from the
    `Releases page <https://github.com/nats-io/nats-server/releases>`_ and start it. By
    default, it runs on port ``4222`` at ``localhost``.

    If your NATS server is running on ``localhost`` using the default port, you can
    stream the ``"data"`` topic to a Pathway table like this:

    >>> import pathway as pw
    >>> table = pw.io.nats.read("nats://127.0.0.1:4222", "data")

    Keep in mind that NATS doesn't normally store messages. So, make sure to start your
    Pathway program before sending any messages.

    You can also parse messages as UTF-8 during reading by using the ``"format"`` parameter.
    Here's how the reading process would look:

    >>> table = pw.io.nats.read("nats://127.0.0.1:4222", "data", format="plaintext")

    Alternatively, you can read and parse a JSON table during the reading process by
    using the ``"json"`` format and the ``schema`` parameter.

    For example, if your data is in JSON format with three fields - an integer ``user_id``
    (which you'd like to use as the primary key instead of an autogenerated one), and
    two string fields ``username`` and ``phone`` - you can define the schema like this:

    >>> class InputSchema(pw.Schema):
    ...     user_id: int = pw.column_definition(primary_key=True)
    ...     username: str
    ...     phone: str

    Now, you can use the ``format`` and ``schema`` parameters of the connector like this:

    >>> table = pw.io.nats.read(
    ...     "nats://127.0.0.1:4222",
    ...     "data",
    ...     format="json",
    ...     schema=InputSchema,
    ... )

    As a result, you will have a table with three columns: ``"user_id"``, ``"username"``, and
    ``"phone"``. The ``"user_id"`` column will also act as the primary key for the Pathway table.

    If you are using the JetStream extension for your NATS connector, you need to provide
    the ``jetstream_stream_name`` field, specifying name of the persistent data stream. The code
    then looks as follows:

    >>> table = pw.io.nats.read(
    ...     "nats://127.0.0.1:4222",
    ...     "data",
    ...     format="json",
    ...     schema=InputSchema,
    ...     jetstream_stream_name="your_stream_name",
    ... )

    Note that the autogenerated name of the durable consumer won't change. Therefore,
    if you restart your Pathway program, this configuration will only read the new messages
    that were added after the last execution. For example, if two new messages arrived since
    the previous run, only those two messages will be read.

    If desired, you can also specify the name of your own durable consumer by setting
    the ``durable_consumer_name`` field. This allows Pathway to use your existing durable
    consumer instead of creating a new one automatically.

    >>> table = pw.io.nats.read(
    ...     "nats://127.0.0.1:4222",
    ...     "data",
    ...     format="json",
    ...     schema=InputSchema,
    ...     jetstream_stream_name="your_stream_name",
    ...     durable_consumer_name="your_consumer_name",
    ... )
    """

    if durable_consumer_name is not None and jetstream_stream_name is None:
        raise ValueError(
            "Durable consumer name can only be used if JetStream is enabled"
        )

    data_storage = api.DataStorage(
        storage_type="nats",
        path=uri,
        topic=topic,
        parallel_readers=parallel_readers,
        mode=api.ConnectorMode.STREAMING,
        js_stream_name=jetstream_stream_name,
        durable_consumer_name=durable_consumer_name,
    )
    schema, data_format = construct_schema_and_data_format(
        "binary" if format == "raw" else format,
        schema=schema,
        csv_settings=None,
        json_field_paths=json_field_paths,
    )
    data_source_options = datasource.DataSourceOptions(
        commit_duration_ms=autocommit_duration_ms,
        unique_name=_get_unique_name(name, kwargs),
        max_backlog_size=max_backlog_size,
    )
    return table_from_datasource(
        datasource.GenericDataSource(
            datastorage=data_storage,
            dataformat=data_format,
            data_source_options=data_source_options,
            schema=schema,
            datasource_name="nats",
        ),
        debug_datasource=datasource.debug_datasource(debug_data),
    )


@check_raw_and_plaintext_only_kwargs_for_message_queues
@check_arg_types
@trace_user_frame
def write(
    table: Table,
    uri: str,
    topic: str | ColumnReference,
    *,
    format: Literal["json", "dsv", "plaintext", "raw"] = "json",
    delimiter: str = ",",
    jetstream_stream_name: str | None = None,
    value: ColumnReference | None = None,
    headers: Iterable[ColumnReference] | None = None,
    name: str | None = None,
    sort_by: Iterable[ColumnReference] | None = None,
) -> None:
    """Writes data into the specified NATS topic.

    The produced messages consist of the payload, corresponding to the values of the table
    that are serialized according to the chosen format and two headers: ``pathway_time``,
    corresponding to the processing time of the entry and ``pathway_diff`` that is either ``1``
    or ``-1``. Both header values are provided as UTF-8 encoded strings. If ``headers``
    parameter is used, additional headers can be added to the message.

    There are several serialization formats supported: ``"json"``, ``"dsv"``, ``"plaintext"``
    and ``"raw"``. The format defines how the message is formed. In case of JSON and DSV
    (delimiter separated values), the message is formed in accordance with the respective data format.

    If the selected format is either ``"plaintext"`` or ``"raw"``, you also need to specify,
    which column of the table correspond to the payload of the produced NATS message. It can be
    done by providing ``value`` parameter. In order to output extra values from the table in
    these formats, NATS headers can be used. You can specify the column references in the
    ``headers`` parameter, which leads to serializing the extracted fields into UTF-8
    strings and passing them as additional message headers.

    When using the JetStream extension, you need to specify the name of the stream that
    is used for data persistence. Provide this name in the ``jetstream_stream_name`` field
    to ensure that Pathway writes to the correct persistent stream.

    Args:
        table: The table for output.
        uri: The URI of the NATS server.
        topic: The NATS topic where data will be written. This can be a specific topic name
            or a reference to a column whose values will be used as the topic for each message.
            If using a column reference, the column must contain string values.
        format: format in which the data is put into NATS. Currently ``"json"``,
            ``"plaintext"``, ``"raw"`` and ``"dsv"`` are supported. If the ``"raw"`` format is selected,
            ``table`` must either contain exactly one binary column that will be dumped as it is
            into the message, or the reference to the target binary column must be specified explicitly
            in the ``value`` parameter. Similarly, if ``"plaintext"`` is chosen, the table should consist
            of a single column of the string type.
        delimiter: field delimiter to be used in case of delimiter-separated values format.
        jetstream_stream_name: if specified, the
            `JetStream <https://docs.nats.io/nats-concepts/jetstream>`_ extension is used.
            In this case, the specified stream will be used.
        value: reference to the column that should be used as a payload in
            the produced message in ``"plaintext"`` or ``"raw"`` format. It can be deduced
            automatically if the table has exactly one column. Otherwise it must be specified directly.
        headers: references to the table fields that must be provided as message
            headers. These headers are named in the same way as fields that are forwarded and correspond
            to the string representations of the respective values encoded in UTF-8. Note that
            due to NATS constraints imposed on headers, the binary fields must also be UTF-8
            serializable.
        name: A unique name for the connector. If provided, this name will be used in
            logs and monitoring dashboards.
        sort_by: If specified, the output will be sorted in ascending order based on the
            values of the given columns within each minibatch. When multiple columns are provided,
            the corresponding value tuples will be compared lexicographically.

    Example:

    Assume you have the NATS server running locally on the default port, ``4222``. Let's
    explore a few ways to send the contents of a table to the topic ``test_topic`` on this server.

    First, you'll need to create a Pathway table. You can do this using the ``table_from_markdown``
    method to set up a test table with information about pets and their owners.

    >>> import pathway as pw
    ...
    >>> table = pw.debug.table_from_markdown('''
    ... age | owner | pet
    ... 10  | Alice | dog
    ... 9   | Bob   | cat
    ... 8   | Alice | cat
    ... ''')

    To output the table's contents in JSON format, use the connector like this:

    >>> pw.io.nats.write(
    ...     table,
    ...     "nats://127.0.0.1:4222",
    ...     "test_topic",
    ...     format="json",
    ... )

    In this case, the output will include the table's rows in JSON format, with ``time``
    and ``diff`` fields added to each JSON payload.

    You can also use a single column from the table as the payload. For instance, to use
    the ``owner`` column as the NATS message payload, implement it as follows:

    >>> pw.io.nats.write(
    ...     table,
    ...     "nats://127.0.0.1:4222",
    ...     "test_topic",
    ...     format="plaintext",
    ...     value=table.owner,
    ... )

    If needed, you can also send the remaining fields as headers. To do this, modify the
    code to use the ``headers`` field, which should include all the required fields.
    Since ``owner`` is already being sent as the message payload, you can add the
    ``age`` and ``pet`` columns to the headers. Here's what the code would look like:

    >>> pw.io.nats.write(
    ...     table,
    ...     "nats://127.0.0.1:4222",
    ...     "test_topic",
    ...     format="plaintext",
    ...     value=table.owner,
    ...     headers=[table.age, table.pet],
    ... )

    If you are using JetStream, you need to specify an additional parameter
    ``jetstream_stream_name``, where you indicate the name of the existing
    stream in the JetStream.

    >>> pw.io.nats.write(
    ...     table,
    ...     "nats://127.0.0.1:4222",
    ...     "test_topic",
    ...     jetstream_stream_name="your_stream_name",
    ...     format="plaintext",
    ...     value=table.owner,
    ...     headers=[table.age, table.pet],
    ... )
    """
    output_format = MessageQueueOutputFormat.construct(
        table,
        format=format,
        delimiter=delimiter,
        value=value,
        headers=headers,
        topic_name=topic if isinstance(topic, ColumnReference) else None,
    )
    table = output_format.table

    data_storage = api.DataStorage(
        storage_type="nats",
        path=uri,
        topic=topic if isinstance(topic, str) else None,
        topic_name_index=output_format.topic_name_index,
        header_fields=list(output_format.header_fields.items()),
        js_stream_name=jetstream_stream_name,
    )

    table.to(
        datasink.GenericDataSink(
            data_storage,
            output_format.data_format,
            datasink_name="nats",
            unique_name=name,
            sort_by=sort_by,
        )
    )
